//! # RPC 工具和结构体
//!
//! 这个模块包含了一些用于处理RPC（远程过程调用）的实用工具和结构体。
//! 它使用了多个库，包括`anyhow`、`flume`、`futures-lite`、`futures-util`、`quic_rpc`、`tokio`和`tracing`。
//!
//! ## 主要组件和功能
//!
//! - **导入和重导出**：使用`pub use`语句重导出了一些库中的类型和函数，以便在其他模块中直接使用。
//! - **`GetServiceHandler` trait**：定义了一个trait，用于获取特定服务的处理程序。
//! - **`ServiceHandler` trait**：定义了一个trait，用于服务的处理程序。
//! - **`run_server` 函数**：用于运行一个RPC服务器。
//! - **`ClientStreamingResponse` 结构体**：用于处理客户端流式RPC响应。
//! - **`ServerStreamingResponse` 结构体**：用于处理服务器流式RPC响应。
//!
//! ## 注意事项
//!
//! - **异步处理**：代码中大量使用了异步处理，包括`async/await`关键字和`Future` trait。
//! - **错误处理**：代码中使用了`Result`和`Error`类型进行错误处理。
//! - **并发**：代码中使用了`Arc`和`LazyLock`来处理并发访问和延迟初始化。
//! - **类型安全**：通过trait和泛型，代码确保了类型安全。
//!
//! ## 用途
//!
//! 这个模块给微服务框架导出了一些工具函数和结构体，可以处理不同类型的RPC请求，包括客户端流式和服务器流式请求。
//! 它提供了基本的错误处理和并发支持，使得开发者可以更容易地构建高性能的RPC服务。

mod error;
mod transport;

pub use error::{QuicRpcWrapError, Result};
pub use flume::bounded as flume_bounded;
use futures_lite::future::Boxed;
pub use futures_lite::stream::{Stream, StreamExt};
use futures_util::SinkExt;
#[cfg(feature = "quinn")]
pub use iroh_quinn::{
    ClientConfig, Endpoint as QuinnEndpoint, ServerConfig,
    crypto::rustls::{QuicClientConfig, QuicServerConfig},
    rustls::{
        RootCertStore,
        pki_types::{CertificateDer, PrivatePkcs8KeyDer},
        version::TLS13,
    },
};
#[cfg(feature = "flume")]
pub use quic_rpc::transport::flume::channel as flume_channel;
#[cfg(feature = "hyper")]
pub use quic_rpc::transport::hyper::{HyperConnector, HyperListener};
#[cfg(feature = "quinn")]
pub use quic_rpc::transport::quinn::{QuinnConnector, QuinnListener};
pub use quic_rpc::{
    Connector, Listener, RpcClient, RpcMessage, RpcServer, Service,
    client::{BoxStreamSync, BoxedConnector, UpdateSink},
    message::{
        BidiStreaming, BidiStreamingMsg, ClientStreaming, ClientStreamingMsg, Msg, RpcMsg,
        ServerStreaming, ServerStreamingMsg,
    },
    server::{BoxedChannelTypes, BoxedListener, ChannelTypes, RpcChannel},
};
#[cfg(feature = "quinn")]
use std::{
    fs::File,
    io::{Read, Write},
    path::Path,
};
use std::{
    future::Future,
    marker::PhantomData,
    mem::replace,
    pin::Pin,
    sync::{Arc, LazyLock},
};
use tokio::runtime::Builder;
pub use tokio::{pin, runtime::Runtime, sync::oneshot::channel as oneshot_channel};
use tracing::{debug, error, warn};
#[cfg(feature = "pipe")]
pub use transport::pipe::{PipeConnector, PipeListener};
#[cfg(feature = "iroh")]
pub use {
    iroh::{Endpoint as IrohEndpoint, NodeAddr, SecretKey},
    quic_rpc::transport::iroh::{IrohConnector, IrohListener},
};

/// 获取特定服务的处理程序
pub trait GetServiceHandler<S: Service> {
    fn get_handler(self: Arc<Self>) -> Arc<S>;
}

/// 服务的处理程序
pub trait ServiceHandler<S: Service, C: ChannelTypes<S> = BoxedChannelTypes<S>> {
    /// 用于服务端处理请求和响应。
    ///
    /// # Arguments
    ///
    /// * `req`: 请求参数。
    /// * `chan`: 连接通道。
    /// * `rt`: 异步运行时。
    ///
    /// returns: impl Future<Output=Result<()>>+Send+Sized 是否处理成功。
    ///
    /// # Examples
    ///
    /// ```
    /// None::<()>;
    /// ```
    fn handle_rpc_request(
        self: Arc<Self>,
        req: S::Req,
        chan: RpcChannel<S, C>,
        rt: &'static Runtime,
    ) -> impl Future<Output = Result<()>> + Send;
}

pub static TIME: std::sync::LazyLock<std::time::Instant> =
    std::sync::LazyLock::new(std::time::Instant::now);

pub async fn run_server<S, L>(server: RpcServer<S, L>)
where
    L: Listener<S>,
    S: Service + Default + ServiceHandler<S>,
{
    let service = Arc::new(S::default());
    debug!("{:?}", service);
    static RT: LazyLock<Runtime> =
        LazyLock::new(|| Builder::new_multi_thread().enable_all().build().unwrap());
    loop {
        let Ok(accepting) = server.accept().await else {
            continue;
        };

        match accepting.read_first().await {
            Err(err) => warn!(?err, "server accept failed"),
            Ok((req, chan)) => {
                let handler = service.clone();
                RT.spawn(async move {
                    if let Err(err) =
                        S::handle_rpc_request(handler, req, chan.map().boxed(), &*RT).await
                    {
                        warn!(?err, "internal rpc error");
                    }
                });
            }
        }
    }
}

pub struct ClientStreamingResponse<T, S, C, R>(
    Option<UpdateSink<C, T>>,
    Boxed<Result<R>>,
    PhantomData<S>,
)
where
    S: Service,
    C: Connector<S>,
    T: Into<C::Out>;

impl<T, S, C, R> ClientStreamingResponse<T, S, C, R>
where
    S: Service,
    C: Connector<S>,
    T: Into<C::Out>,
{
    pub fn new(
        sink: UpdateSink<C, T>,
        result: impl Future<Output = Result<R>> + Send + 'static,
    ) -> Self {
        Self(sink.into(), Box::pin(result) as _, Default::default())
    }

    pub async fn put(&mut self, item: T) -> &mut Self {
        let Some(sink) = self.0.as_mut() else {
            return self;
        };
        if let Err(e) = sink.send(item).await {
            error!("Send data error. ({})", e);
        }
        self
    }

    pub async fn result(&mut self) -> Result<R> {
        let Some(mut sink) = replace(&mut self.0, None) else {
            return Err(QuicRpcWrapError::BadSink);
        };
        sink.close()
            .await
            .map_err(|e| QuicRpcWrapError::Send(e.to_string()))?;
        drop(sink);
        replace(
            &mut self.1,
            Box::pin(async { Err(QuicRpcWrapError::ResultAlreadyTakenAway) }) as _,
        )
        .await
    }
}

pub struct ServerStreamingResponse<R>(Pin<Box<dyn Stream<Item = Result<R>> + Send>>);

impl<R> ServerStreamingResponse<R> {
    pub fn new(stream: impl Stream<Item = Result<R>> + Send + 'static) -> Self {
        Self(Box::pin(stream) as _)
    }

    pub async fn next(&mut self) -> Option<Result<R>> {
        self.0.next().await
    }
}

unsafe impl<R> Send for ServerStreamingResponse<R> {}

//noinspection SpellCheckingInspection
/// 生成服务器证书和私钥。
///
/// 这个函数使用`rcgen`库生成一个自签名证书和对应的私钥。
///
/// # 参数
///
/// * `subject_alt_names` - 一个字符串切片，包含主题备用名称（Subject Alternative Names）。
///
/// # 返回值
///
/// 返回一个包含证书和私钥字节数组的元组。如果生成证书失败，将返回一个错误。
///
/// # 示例
///
/// ```rust
/// use std::error::Error;
///
/// #[cfg(feature = "quinn")]
/// fn main() -> Result<(), Box<dyn Error>> {
///     let subject_alt_names = vec!["example.com", "localhost"];
///     let (cert, key) = quic_rpc_utils::gen_server_cert(&subject_alt_names)?;
///     println!("Certificate: {:?}", cert);
///     println!("Private Key: {:?}", key);
///     Ok(())
/// }
/// ```
///
/// # 注意
///
/// 1. 生成的证书仅用于开发和测试，不应在生产环境中使用。
/// 2. 确保在编译时启用了`quinn`特性。
///
/// # 错误处理
///
/// 如果生成证书失败，将返回一个错误。
#[cfg(feature = "quinn")]
pub fn gen_server_cert(subject_alt_names: &[&str]) -> Result<(Vec<u8>, Vec<u8>)> {
    let rcgen::CertifiedKey { cert, signing_key } = rcgen::generate_simple_self_signed(
        subject_alt_names
            .iter()
            .map(|i| i.to_string())
            .collect::<Vec<_>>(),
    )?;
    let cert_der = cert.der();
    let private_key = signing_key.serialize_der();
    Ok((cert_der.to_vec(), private_key))
}

/// 将证书文件和私钥文件保存到指定的文件路径中。
///
/// # 参数
///
/// - `cert_der: &[u8]`：证书文件的DER编码格式数据。
/// - `private_key: &[u8]`：私钥文件的数据。
/// - `cert_der_file: impl AsRef<Path>`：证书文件保存路径。
/// - `private_key_file: impl AsRef<Path>`：私钥文件保存路径。
///
/// # 返回值
///
/// - `Result<()>`：如果保存成功，返回`Ok(())`；如果保存失败，返回`Err`。
///
/// # 示例
///
/// ```rust
/// use std::path::Path;
/// use quic_rpc_utils::save_cert_file;
///
/// let cert_der = vec![0x30, 0x82, 0x01, 0x22, /* ... */];
/// let private_key = vec![0x30, 0x82, 0x01, 0x22, /* ... */];
/// let cert_der_file = Path::new("path/to/cert.crt");
/// let private_key_file = Path::new("path/to/private_key.key");
///
/// if let Err(e) = save_cert_file(&cert_der, &private_key, cert_der_file, private_key_file) {
///     eprintln!("保存文件失败: {}", e);
/// }
/// ```
#[cfg(feature = "quinn")]
pub fn save_cert_file(
    cert_der: &[u8],
    private_key: &[u8],
    cert_der_file: impl AsRef<Path>,
    private_key_file: impl AsRef<Path>,
) -> Result<()> {
    File::create(cert_der_file)?.write_all(cert_der)?;
    File::create(private_key_file)?.write_all(private_key)?;
    Ok(())
}

/// 从指定的文件路径中读取证书文件和私钥文件的数据。
///
/// # 参数
///
/// - `cert_der_file: impl AsRef<Path>`：证书文件路径。
/// - `private_key_file: impl AsRef<Path>`：私钥文件路径。
///
/// # 返回值
///
/// - `Result<(Vec<u8>, Vec<u8>)>`：如果读取成功，返回包含证书文件和私钥文件数据的元组；如果读取失败，返回`Err`。
///
/// # 示例
///
/// ```rust
/// use std::path::Path;
/// use quic_rpc_utils::read_cert_file;
///
/// let cert_der_file = Path::new("path/to/cert.der");
/// let private_key_file = Path::new("path/to/private_key.pem");
///
/// match read_cert_file(cert_der_file, private_key_file) {
///     Ok((cert_der, private_key)) => {
///         println!("读取的证书文件数据: {:?}", cert_der);
///         println!("读取的私钥文件数据: {:?}", private_key);
///     }
///     Err(e) => {
///         eprintln!("读取文件失败: {}", e);
///     }
/// }
/// ```
#[cfg(feature = "quinn")]
pub fn read_cert_file(
    cert_der_file: impl AsRef<Path>,
    private_key_file: impl AsRef<Path>,
) -> Result<(Vec<u8>, Vec<u8>)> {
    let (mut cert_der, mut key) = Default::default();
    File::open(cert_der_file)?.read_to_end(&mut cert_der)?;
    File::open(private_key_file)?.read_to_end(&mut key)?;
    Ok((cert_der, key))
}

/// 返回默认服务器配置
#[cfg(feature = "quinn")]
#[allow(clippy::field_reassign_with_default)] // https://github.com/rust-lang/rust-clippy/issues/6527
pub fn configure_server(
    max_concurrent_uni_streams: u8,
    cert_der: Vec<u8>,
    private_key: Vec<u8>,
) -> Result<ServerConfig> {
    let private_key = PrivatePkcs8KeyDer::from(private_key);
    let cert_chain = vec![CertificateDer::from(cert_der)];

    let crypto_server_config = iroh_quinn::rustls::ServerConfig::builder_with_provider(Arc::new(
        iroh_quinn::rustls::crypto::ring::default_provider(),
    ))
    .with_protocol_versions(&[&TLS13])
    .expect("valid versions")
    .with_no_client_auth()
    .with_single_cert(cert_chain, private_key.into())?;
    let quic_server_config = QuicServerConfig::try_from(crypto_server_config)?;
    let mut server_config = ServerConfig::with_crypto(Arc::new(quic_server_config));

    Arc::get_mut(&mut server_config.transport)
        .unwrap()
        .max_concurrent_uni_streams(max_concurrent_uni_streams.into());

    Ok(server_config)
}

/// 构建默认的 quinn 客户端配置并信任给定的证书。
///
/// ## Args
///
/// - `server_certs`: DER 格式的受信任证书列表。
#[cfg(feature = "quinn")]
pub fn configure_client(server_certs: &[&[u8]]) -> Result<ClientConfig> {
    let mut certs = RootCertStore::empty();
    for cert in server_certs {
        let cert = CertificateDer::from(cert.to_vec());
        certs.add(cert)?;
    }

    let crypto_client_config = iroh_quinn::rustls::ClientConfig::builder_with_provider(Arc::new(
        iroh_quinn::rustls::crypto::ring::default_provider(),
    ))
    .with_protocol_versions(&[&TLS13])
    .expect("valid versions")
    .with_root_certificates(certs)
    .with_no_client_auth();
    let quic_client_config = QuicClientConfig::try_from(crypto_client_config)?;

    Ok(ClientConfig::new(Arc::new(quic_client_config)))
}
